package com.bw.gmall.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.bean.UserLoginBean;
import com.bw.gmall.utils.DateFormatUtil;
import com.bw.gmall.utils.MyClickHouseUtil;
import com.bw.gmall.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class DwsUserUserLoginWindow {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String topic = "dwd_traffic_page_log";
        String groupId = "dws_user_login_window_2103a";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                String uid = jsonObject.getJSONObject("common").getString("uid");
                String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
                if (uid != null && (lastPageId == null || lastPageId.equals("login"))) {
                    out.collect(jsonObject);
                }
            }
        });

        SingleOutputStreamOperator<JSONObject> jsonOWWDS = jsonObjDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts");
                            }
                        }));

        KeyedStream<JSONObject, String> keyedStream = jsonOWWDS.keyBy(a -> a.getJSONObject("common").getString("uid"));

        SingleOutputStreamOperator<UserLoginBean> userLong = (SingleOutputStreamOperator<UserLoginBean>) keyedStream.flatMap(new RichFlatMapFunction<JSONObject, UserLoginBean>() {

            public ValueState<String> lastLongState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastLongState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-login", String.class));
            }

            @Override
            public void flatMap(JSONObject value, Collector<UserLoginBean> out) throws Exception {
                String lastLogDT = lastLongState.value();
                Long ts = value.getLong("ts");
                String curDt = DateFormatUtil.toDate(ts);
                long uv = 0L;
                long backUv = 0L;
                if (lastLogDT == null) {
                    uv = 1L;
                    lastLongState.update(curDt);
                } else if (!lastLogDT.equals(curDt)) {
                    uv = 1L;
                    lastLongState.update(curDt);

                    if ((DateFormatUtil.toTs(curDt) - DateFormatUtil.toTs(lastLogDT)) / (24 * 60 * 60 * 1000L) >= 8) {
                        backUv = 1L;
                    }

                    if (uv != 0L) {
                        out.collect(new UserLoginBean("", "", backUv, uv, ts));
                    }
                }
            }
        });
        userLong.print();

        SingleOutputStreamOperator<UserLoginBean> reduce = userLong.windowAll(
                        TumblingEventTimeWindows
                                .of(Time.seconds(10)))
                .reduce(new ReduceFunction<UserLoginBean>() {
                    @Override
                    public UserLoginBean reduce(UserLoginBean value1, UserLoginBean value2) throws Exception {
                        value1.setBackCt(value1.getBackCt() + value2.getBackCt());
                        value1.setUuCt(value1.getUuCt() + value2.getUuCt());
                        return value1;
                    }
                }, new AllWindowFunction<UserLoginBean, UserLoginBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<UserLoginBean> values, Collector<UserLoginBean> out) throws Exception {
                        UserLoginBean next = values.iterator().next();
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        next.setTs(System.currentTimeMillis());
                        out.collect(next);
                    }
                });

        reduce.print();

        reduce.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_user_user_login_window values(?,?,?,?,?)"));


    }
}
